Browse Source

handle errors

pull/7481/head
chrislu 2 weeks ago
parent
commit
990778613b
  1. 135
      weed/s3api/s3api_object_handlers.go

135
weed/s3api/s3api_object_handlers.go

@ -45,6 +45,34 @@ var corsHeaders = []string{
// Package-level to avoid per-call allocations in writeZeroBytes
var zeroBuf = make([]byte, 32*1024)
// StreamError is returned when streaming functions encounter errors.
// It tracks whether an HTTP response has already been written to prevent
// double WriteHeader calls that would create malformed S3 error responses.
type StreamError struct {
// Err is the underlying error
Err error
// ResponseWritten indicates if HTTP headers/status have been written to ResponseWriter
ResponseWritten bool
}
func (e *StreamError) Error() string {
return e.Err.Error()
}
func (e *StreamError) Unwrap() error {
return e.Err
}
// newStreamError creates a StreamError for cases where response hasn't been written yet
func newStreamError(err error) *StreamError {
return &StreamError{Err: err, ResponseWritten: false}
}
// newStreamErrorWithResponse creates a StreamError for cases where response was already written
func newStreamErrorWithResponse(err error) *StreamError {
return &StreamError{Err: err, ResponseWritten: true}
}
func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser {
mimeBuffer := make([]byte, 512)
size, _ := dataReader.Read(mimeBuffer)
@ -686,9 +714,14 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
streamTime = time.Since(tStream)
if err != nil {
glog.Errorf("GetObjectHandler: failed to stream from volume servers: %v", err)
// Try to write error response. The HTTP library will gracefully handle cases
// where headers are already sent (e.g., during streaming errors).
// For early validation errors, this ensures client gets proper error response.
// Check if the streaming function already wrote an HTTP response
var streamErr *StreamError
if errors.As(err, &streamErr) && streamErr.ResponseWritten {
// Response already written (headers + status code), don't write again
// to avoid "superfluous response.WriteHeader call" and malformed S3 error bodies
return
}
// Response not yet written - safe to write S3 error response
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
@ -713,10 +746,9 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
}()
if entry == nil {
// Early validation error: write proper HTTP response
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Internal Server Error: entry is nil")
return fmt.Errorf("entry is nil")
// Early validation error: write S3-compliant XML error response
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return newStreamErrorWithResponse(fmt.Errorf("entry is nil"))
}
// Get file size
@ -809,21 +841,19 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
// Safely convert int64 to int for slice indexing - validate BEFORE WriteHeader
// Use MaxInt32 for portability across 32-bit and 64-bit platforms
if offset < 0 || offset > int64(math.MaxInt32) || size < 0 || size > int64(math.MaxInt32) {
// Early validation error: write proper HTTP response BEFORE headers
// Early validation error: write S3-compliant error response
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
fmt.Fprintf(w, "Range too large for platform: offset=%d, size=%d", offset, size)
return fmt.Errorf("range too large for platform: offset=%d, size=%d", offset, size)
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
return newStreamErrorWithResponse(fmt.Errorf("range too large for platform: offset=%d, size=%d", offset, size))
}
start := int(offset)
end := start + int(size)
// Bounds check (should already be validated, but double-check) - BEFORE WriteHeader
if start < 0 || start > len(entry.Content) || end > len(entry.Content) || end < start {
// Early validation error: write proper HTTP response BEFORE headers
// Early validation error: write S3-compliant error response
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
fmt.Fprintf(w, "Invalid range for inline content: start=%d, end=%d, len=%d)", start, end, len(entry.Content))
return fmt.Errorf("invalid range for inline content: start=%d, end=%d, len=%d", start, end, len(entry.Content))
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
return newStreamErrorWithResponse(fmt.Errorf("invalid range for inline content: start=%d, end=%d, len=%d", start, end, len(entry.Content)))
}
// Validation passed - now set headers and write
s3a.setResponseHeaders(w, entry, totalSize)
@ -849,10 +879,9 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
// BUG FIX: If totalSize > 0 but no chunks and no content, this is a data integrity issue
if totalSize > 0 && len(entry.Content) == 0 {
glog.Errorf("streamFromVolumeServers: Data integrity error - entry reports size %d but has no content or chunks", totalSize)
// IMPORTANT: Write error status before returning, since headers haven't been written yet
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Internal Server Error: data integrity issue (size %d reported but no content available)", totalSize)
return fmt.Errorf("data integrity error: size %d reported but no content available", totalSize)
// Write S3-compliant XML error response
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return newStreamErrorWithResponse(fmt.Errorf("data integrity error: size %d reported but no content available", totalSize))
}
// Empty object - set headers and write status
s3a.setResponseHeaders(w, entry, totalSize)
@ -878,9 +907,9 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
chunkResolveTime = time.Since(tChunkResolve)
if err != nil {
glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Internal Server Error: failed to resolve chunks")
return fmt.Errorf("failed to resolve chunks: %v", err)
// Write S3-compliant XML error response
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return newStreamErrorWithResponse(fmt.Errorf("failed to resolve chunks: %v", err))
}
// Prepare streaming function with simple master client wrapper
@ -901,9 +930,9 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
streamPrepTime = time.Since(tStreamPrep)
if err != nil {
glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Internal Server Error: failed to prepare stream")
return fmt.Errorf("failed to prepare stream: %v", err)
// Write S3-compliant XML error response
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return newStreamErrorWithResponse(fmt.Errorf("failed to prepare stream: %v", err))
}
// All validation and preparation successful - NOW set headers and write status
@ -931,10 +960,11 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
streamExecTime = time.Since(tStreamExec)
if err != nil {
glog.Errorf("streamFromVolumeServers: streamFn failed: %v", err)
} else {
glog.V(4).Infof("streamFromVolumeServers: streamFn completed successfully")
// Streaming error after WriteHeader was called - response already partially written
return newStreamErrorWithResponse(err)
}
return err
glog.V(4).Infof("streamFromVolumeServers: streamFn completed successfully")
return nil
}
// Shared HTTP client for volume server requests (connection pooling)
@ -1019,7 +1049,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
if totalSize == 0 || suffixLen <= 0 {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return fmt.Errorf("invalid suffix range for empty object")
return newStreamErrorWithResponse(fmt.Errorf("invalid suffix range for empty object"))
}
if suffixLen > totalSize {
suffixLen = totalSize
@ -1030,7 +1060,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
// Set header BEFORE WriteHeader
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return fmt.Errorf("invalid suffix range")
return newStreamErrorWithResponse(fmt.Errorf("invalid suffix range"))
}
} else {
// Regular range or open-ended range
@ -1053,7 +1083,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
// Set header BEFORE WriteHeader
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return fmt.Errorf("invalid range start")
return newStreamErrorWithResponse(fmt.Errorf("invalid range start"))
}
if endOffset >= totalSize {
@ -1064,7 +1094,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
// Set header BEFORE WriteHeader
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return fmt.Errorf("invalid range: end before start")
return newStreamErrorWithResponse(fmt.Errorf("invalid range: end before start"))
}
}
@ -1084,18 +1114,18 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
customerKey, err := ParseSSECHeaders(r)
if err != nil {
s3err.WriteErrorResponse(w, r, MapSSECErrorToS3Error(err))
return err
return newStreamErrorWithResponse(err)
}
if customerKey == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
return fmt.Errorf("SSE-C key required")
return newStreamErrorWithResponse(fmt.Errorf("SSE-C key required"))
}
// Validate key MD5
if entry.Extended != nil {
storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5])
if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 {
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return fmt.Errorf("SSE-C key mismatch")
return newStreamErrorWithResponse(fmt.Errorf("SSE-C key mismatch"))
}
}
decryptionKey = customerKey
@ -1103,27 +1133,27 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
// Extract KMS key from metadata (stored as raw bytes, matching filer behavior)
if entry.Extended == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return fmt.Errorf("no SSE-KMS metadata")
return newStreamErrorWithResponse(fmt.Errorf("no SSE-KMS metadata"))
}
kmsMetadataBytes := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]
sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return err
return newStreamErrorWithResponse(err)
}
decryptionKey = sseKMSKey
case s3_constants.SSETypeS3:
// Extract S3 key from metadata (stored as raw bytes, matching filer behavior)
if entry.Extended == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return fmt.Errorf("no SSE-S3 metadata")
return newStreamErrorWithResponse(fmt.Errorf("no SSE-S3 metadata"))
}
keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key]
keyManager := GetSSES3KeyManager()
sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return err
return newStreamErrorWithResponse(err)
}
decryptionKey = sseS3Key
}
@ -1157,7 +1187,11 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
err := s3a.streamDecryptedRangeFromChunks(r.Context(), w, entry, offset, size, sseType, decryptionKey)
decryptSetupTime = time.Since(tDecryptSetup)
copyTime = decryptSetupTime // Streaming is included in decrypt setup for range-aware path
return err
if err != nil {
// Error after WriteHeader - response already written
return newStreamErrorWithResponse(err)
}
return nil
}
// Full object path: Optimize multipart vs single-part
@ -1191,13 +1225,15 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry)
streamFetchTime = time.Since(tStreamFetch)
if streamErr != nil {
return streamErr
// Error after WriteHeader - response already written
return newStreamErrorWithResponse(streamErr)
}
defer encryptedReader.Close()
iv := entry.Extended[s3_constants.SeaweedFSSSEIV]
if len(iv) == 0 {
return fmt.Errorf("SSE-C IV not found in entry metadata")
// Error after WriteHeader - response already written
return newStreamErrorWithResponse(fmt.Errorf("SSE-C IV not found in entry metadata"))
}
glog.V(2).Infof("SSE-C decryption: IV length=%d, KeyMD5=%s", len(iv), customerKey.KeyMD5)
decryptedReader, err = CreateSSECDecryptedReader(encryptedReader, customerKey, iv)
@ -1227,7 +1263,8 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry)
streamFetchTime = time.Since(tStreamFetch)
if streamErr != nil {
return streamErr
// Error after WriteHeader - response already written
return newStreamErrorWithResponse(streamErr)
}
defer encryptedReader.Close()
@ -1259,14 +1296,16 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry)
streamFetchTime = time.Since(tStreamFetch)
if streamErr != nil {
return streamErr
// Error after WriteHeader - response already written
return newStreamErrorWithResponse(streamErr)
}
defer encryptedReader.Close()
keyManager := GetSSES3KeyManager()
iv, ivErr := GetSSES3IV(entry, sseS3Key, keyManager)
if ivErr != nil {
return fmt.Errorf("failed to get SSE-S3 IV: %w", ivErr)
// Error after WriteHeader - response already written
return newStreamErrorWithResponse(fmt.Errorf("failed to get SSE-S3 IV: %w", ivErr))
}
glog.V(2).Infof("SSE-S3 decryption: KeyID=%s, IV length=%d", sseS3Key.KeyID, len(iv))
decryptedReader, err = CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv)
@ -1276,7 +1315,8 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
if err != nil {
glog.Errorf("SSE decryption error (%s): %v", sseType, err)
return fmt.Errorf("failed to create decrypted reader: %w", err)
// Error after WriteHeader - response already written
return newStreamErrorWithResponse(fmt.Errorf("failed to create decrypted reader: %w", err))
}
// Close the decrypted reader to avoid leaking HTTP bodies
@ -1295,7 +1335,8 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
copyTime = time.Since(tCopy)
if copyErr != nil {
glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr)
return copyErr
// Error after WriteHeader - response already written
return newStreamErrorWithResponse(copyErr)
}
glog.V(3).Infof("Full object request: copied %d bytes", copied)
return nil

Loading…
Cancel
Save