|
|
@ -236,20 +236,25 @@ func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string { |
|
|
return destUrl |
|
|
return destUrl |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) { |
|
|
|
|
|
|
|
|
|
|
|
bucket, object := s3_constants.GetBucketAndObject(r) |
|
|
|
|
|
glog.V(3).Infof("GetObjectHandler %s %s", bucket, object) |
|
|
|
|
|
|
|
|
// hasConditionalHeaders checks if the request has any conditional headers
|
|
|
|
|
|
// This is a lightweight check to avoid unnecessary function calls
|
|
|
|
|
|
func (s3a *S3ApiServer) hasConditionalHeaders(r *http.Request) bool { |
|
|
|
|
|
return r.Header.Get(s3_constants.IfMatch) != "" || |
|
|
|
|
|
r.Header.Get(s3_constants.IfNoneMatch) != "" || |
|
|
|
|
|
r.Header.Get(s3_constants.IfModifiedSince) != "" || |
|
|
|
|
|
r.Header.Get(s3_constants.IfUnmodifiedSince) != "" |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Handle directory objects with shared logic
|
|
|
|
|
|
if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "GetObjectHandler") { |
|
|
|
|
|
return // Directory object request was handled
|
|
|
|
|
|
|
|
|
// processConditionalHeaders checks conditional headers and writes an error response if a condition fails.
|
|
|
|
|
|
// It returns the result of the check and a boolean indicating if the request has been handled.
|
|
|
|
|
|
func (s3a *S3ApiServer) processConditionalHeaders(w http.ResponseWriter, r *http.Request, bucket, object, handlerName string) (ConditionalHeaderResult, bool) { |
|
|
|
|
|
if !s3a.hasConditionalHeaders(r) { |
|
|
|
|
|
return ConditionalHeaderResult{ErrorCode: s3err.ErrNone}, false |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Check conditional headers for read operations
|
|
|
|
|
|
result := s3a.checkConditionalHeadersForReads(r, bucket, object) |
|
|
result := s3a.checkConditionalHeadersForReads(r, bucket, object) |
|
|
if result.ErrorCode != s3err.ErrNone { |
|
|
if result.ErrorCode != s3err.ErrNone { |
|
|
glog.V(3).Infof("GetObjectHandler: Conditional header check failed for %s/%s with error %v", bucket, object, result.ErrorCode) |
|
|
|
|
|
|
|
|
glog.V(3).Infof("%s: Conditional header check failed for %s/%s with error %v", handlerName, bucket, object, result.ErrorCode) |
|
|
|
|
|
|
|
|
// For 304 Not Modified responses, include the ETag header
|
|
|
// For 304 Not Modified responses, include the ETag header
|
|
|
if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" { |
|
|
if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" { |
|
|
@ -257,16 +262,41 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
s3err.WriteErrorResponse(w, r, result.ErrorCode) |
|
|
s3err.WriteErrorResponse(w, r, result.ErrorCode) |
|
|
|
|
|
return result, true // request handled
|
|
|
|
|
|
} |
|
|
|
|
|
return result, false // request not handled
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) { |
|
|
|
|
|
|
|
|
|
|
|
bucket, object := s3_constants.GetBucketAndObject(r) |
|
|
|
|
|
glog.V(3).Infof("GetObjectHandler %s %s", bucket, object) |
|
|
|
|
|
|
|
|
|
|
|
// Handle directory objects with shared logic
|
|
|
|
|
|
if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "GetObjectHandler") { |
|
|
|
|
|
return // Directory object request was handled
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Check conditional headers and handle early return if conditions fail
|
|
|
|
|
|
result, handled := s3a.processConditionalHeaders(w, r, bucket, object, "GetObjectHandler") |
|
|
|
|
|
if handled { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Check for specific version ID in query parameters
|
|
|
// Check for specific version ID in query parameters
|
|
|
versionId := r.URL.Query().Get("versionId") |
|
|
versionId := r.URL.Query().Get("versionId") |
|
|
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
|
destUrl string |
|
|
|
|
|
entry *filer_pb.Entry // Declare entry at function scope for SSE processing
|
|
|
|
|
|
versioningConfigured bool |
|
|
|
|
|
err error |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
// Check if versioning is configured for the bucket (Enabled or Suspended)
|
|
|
// Check if versioning is configured for the bucket (Enabled or Suspended)
|
|
|
// Note: We need to check this even if versionId is empty, because versioned buckets
|
|
|
// Note: We need to check this even if versionId is empty, because versioned buckets
|
|
|
// handle even "get latest version" requests differently (through .versions directory)
|
|
|
// handle even "get latest version" requests differently (through .versions directory)
|
|
|
versioningConfigured, err := s3a.isVersioningConfigured(bucket) |
|
|
|
|
|
|
|
|
versioningConfigured, err = s3a.isVersioningConfigured(bucket) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if err == filer_pb.ErrNotFound { |
|
|
if err == filer_pb.ErrNotFound { |
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) |
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) |
|
|
@ -276,12 +306,8 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) |
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) |
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId) |
|
|
glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId) |
|
|
|
|
|
|
|
|
var destUrl string |
|
|
|
|
|
var entry *filer_pb.Entry // Declare entry at function scope for SSE processing
|
|
|
|
|
|
|
|
|
|
|
|
if versioningConfigured { |
|
|
if versioningConfigured { |
|
|
// Handle versioned GET - all versions are stored in .versions directory
|
|
|
// Handle versioned GET - all versions are stored in .versions directory
|
|
|
var targetVersionId string |
|
|
var targetVersionId string |
|
|
@ -352,6 +378,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) |
|
|
originalRangeHeader := r.Header.Get("Range") |
|
|
originalRangeHeader := r.Header.Get("Range") |
|
|
var sseObject = false |
|
|
var sseObject = false |
|
|
|
|
|
|
|
|
|
|
|
// Optimization: Reuse already-fetched entry to avoid redundant metadata fetches
|
|
|
if versioningConfigured { |
|
|
if versioningConfigured { |
|
|
// For versioned objects, reuse the already-fetched entry
|
|
|
// For versioned objects, reuse the already-fetched entry
|
|
|
objectEntryForSSE = entry |
|
|
objectEntryForSSE = entry |
|
|
@ -362,7 +389,11 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) |
|
|
objectEntryForSSE = result.Entry |
|
|
objectEntryForSSE = result.Entry |
|
|
glog.V(3).Infof("GetObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object) |
|
|
glog.V(3).Infof("GetObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object) |
|
|
} else { |
|
|
} else { |
|
|
// No conditional headers were checked, fetch entry for SSE processing
|
|
|
|
|
|
|
|
|
// Fetch entry for SSE processing
|
|
|
|
|
|
// This is needed for all SSE types (SSE-C, SSE-KMS, SSE-S3) to:
|
|
|
|
|
|
// 1. Detect encryption from object metadata (SSE-KMS/SSE-S3 don't send headers on GET)
|
|
|
|
|
|
// 2. Add proper response headers
|
|
|
|
|
|
// 3. Handle Range requests on encrypted objects
|
|
|
var fetchErr error |
|
|
var fetchErr error |
|
|
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) |
|
|
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) |
|
|
if fetchErr != nil { |
|
|
if fetchErr != nil { |
|
|
@ -415,27 +446,26 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request |
|
|
return // Directory object request was handled
|
|
|
return // Directory object request was handled
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Check conditional headers for read operations
|
|
|
|
|
|
result := s3a.checkConditionalHeadersForReads(r, bucket, object) |
|
|
|
|
|
if result.ErrorCode != s3err.ErrNone { |
|
|
|
|
|
glog.V(3).Infof("HeadObjectHandler: Conditional header check failed for %s/%s with error %v", bucket, object, result.ErrorCode) |
|
|
|
|
|
|
|
|
|
|
|
// For 304 Not Modified responses, include the ETag header
|
|
|
|
|
|
if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" { |
|
|
|
|
|
w.Header().Set("ETag", result.ETag) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
s3err.WriteErrorResponse(w, r, result.ErrorCode) |
|
|
|
|
|
|
|
|
// Check conditional headers and handle early return if conditions fail
|
|
|
|
|
|
result, handled := s3a.processConditionalHeaders(w, r, bucket, object, "HeadObjectHandler") |
|
|
|
|
|
if handled { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Check for specific version ID in query parameters
|
|
|
// Check for specific version ID in query parameters
|
|
|
versionId := r.URL.Query().Get("versionId") |
|
|
versionId := r.URL.Query().Get("versionId") |
|
|
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
|
destUrl string |
|
|
|
|
|
entry *filer_pb.Entry // Declare entry at function scope for SSE processing
|
|
|
|
|
|
versioningConfigured bool |
|
|
|
|
|
err error |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
// Check if versioning is configured for the bucket (Enabled or Suspended)
|
|
|
// Check if versioning is configured for the bucket (Enabled or Suspended)
|
|
|
// Note: We need to check this even if versionId is empty, because versioned buckets
|
|
|
// Note: We need to check this even if versionId is empty, because versioned buckets
|
|
|
// handle even "get latest version" requests differently (through .versions directory)
|
|
|
// handle even "get latest version" requests differently (through .versions directory)
|
|
|
versioningConfigured, err := s3a.isVersioningConfigured(bucket) |
|
|
|
|
|
|
|
|
versioningConfigured, err = s3a.isVersioningConfigured(bucket) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if err == filer_pb.ErrNotFound { |
|
|
if err == filer_pb.ErrNotFound { |
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) |
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) |
|
|
@ -446,9 +476,6 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
var destUrl string |
|
|
|
|
|
var entry *filer_pb.Entry // Declare entry at function scope for SSE processing
|
|
|
|
|
|
|
|
|
|
|
|
if versioningConfigured { |
|
|
if versioningConfigured { |
|
|
// Handle versioned HEAD - all versions are stored in .versions directory
|
|
|
// Handle versioned HEAD - all versions are stored in .versions directory
|
|
|
var targetVersionId string |
|
|
var targetVersionId string |
|
|
@ -525,7 +552,10 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request |
|
|
objectEntryForSSE = result.Entry |
|
|
objectEntryForSSE = result.Entry |
|
|
glog.V(3).Infof("HeadObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object) |
|
|
glog.V(3).Infof("HeadObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object) |
|
|
} else { |
|
|
} else { |
|
|
// No conditional headers were checked, fetch entry for SSE processing
|
|
|
|
|
|
|
|
|
// Fetch entry for SSE processing
|
|
|
|
|
|
// This is needed for all SSE types (SSE-C, SSE-KMS, SSE-S3) to:
|
|
|
|
|
|
// 1. Detect encryption from object metadata (SSE-KMS/SSE-S3 don't send headers on HEAD)
|
|
|
|
|
|
// 2. Add proper response headers
|
|
|
var fetchErr error |
|
|
var fetchErr error |
|
|
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) |
|
|
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) |
|
|
if fetchErr != nil { |
|
|
if fetchErr != nil { |
|
|
|